multiprocessing
The built in module multiprocessing
provides functionality to create processes which runs given tasks.
http://docs.python.org/2/library/multiprocessing.html
All strategies for paralleliztion has a rathe large overhead compared to lower level languages such as C or FORTRAN.
The way multiprocessing
runs code in parallel is by launching subprocesses with a seperate interpretor for for each process. This means that in order to gain speed the computation we want to perform should be relatively substantial.
(In case you are familiar with threads: It should be noted that Python has a threading
module for working with threads, however, all threads will be run on a single CPU.)
Byt using multiprocessing we can utilize the machines we are running code on more efficiently
In [1]:
import multiprocessing
In [2]:
multiprocessing.cpu_count()
Out[2]:
Before talking about some more advanced featuers, let's describe the most typical use pattern of multiprocessing
.
Note: multiprocessing
can be used in the IPython Notebook, but there are sometimes issues with printing from subprocesses. To make things clearer and avoid complications we shall run external scripts in stead.
Process
Processes share nothing
To spawn a process, initiate it with a target function and call the .start()
method.
This method will arrange things so that given code will be run in a seperate process from the parent process. To get the parent process to wait until a process has finished before moving on one need to call the .join()
method.
In [4]:
import os
os.getpid()
Out[4]:
In [7]:
%%file mp.py
from multiprocessing import Process
import os
def worker():
print("Worker process {}".format(os.getpid()))
if __name__ == "__main__":
proc1 = Process(target=worker)
proc1.start()
proc2 = Process(target=worker)
proc2.start()
In [8]:
%%bash
python mp.py
To get the target function to actually work on some input, you need to provide the arguments in the constructur of the Process
.
In [9]:
%%file mp.py
from multiprocessing import Process
import os
def worker(arg):
print("Worker process {}, argument was {}".format(os.getpid(), arg))
if __name__ == "__main__":
proc1 = Process(target=worker, args=(10,))
proc1.start()
proc2 = Process(target=worker, args=(11,))
proc2.start()
In [10]:
%%bash
python mp.py
Processes communicate over interprocess communication channel
Queue
Pipe
In [11]:
%%file mp2.py
from multiprocessing import Process, Queue
import os
def worker(tasks, results):
t = tasks.get()
result = t * 2
results.put([os.getpid(), t, "->", result])
if __name__ == "__main__":
n = 20
my_tasks = Queue()
my_results = Queue()
workers = [Process(target=worker, args=(my_tasks, my_results)) for i in range(n)]
for proc in workers:
proc.start()
for i in range(n):
my_tasks.put(i)
for i in range(n):
result = my_results.get()
print(result)
In [12]:
%%bash
python mp2.py
Because the processes are executed in parallel we can never know the order of results being put in the Queue
.
In [13]:
from multiprocessing import Queue
In [14]:
q = Queue()
In [15]:
q.get?
In [16]:
%%file mp3.py
from multiprocessing import Process, Manager
import os
def worker(l):
p = os.getpid()
l[int(str(p)[-2:])] = p
if __name__ == "__main__":
n = 100
manager = Manager()
l = manager.list()
l.extend([0] * n)
processes = [Process(target=worker, args=(l,)) for i in range(20)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
print(l)
In [17]:
%%bash
python mp3.py
In [20]:
%%file mp.py
import multiprocessing
import os
def task(args):
print "Running process", os.getpid(), "with args", args
return os.getpid(), args
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = pool.map(task, [1,2,3,4]*3)
print(result)
In [21]:
%%bash
python mp.py
The method .map()
works like the built in function map()
, but will send data from the iterable to different processes. By default it will send one element at a time, but this can be changed with the chunksize
parameter.
A similar method called .map_async()
usually performs better in parallel, but in that case one has to fetch the results using a .get()
method of the returned value of .map_async()
(which is an instance of the class AsyncResult
).
In [22]:
%%file mp.py
import multiprocessing
import os
def task(args):
print "Running process", os.getpid(), "with args", args
return os.getpid(), args
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = pool.map_async(task, [1,2,3,4])
print(result.get())
In [23]:
%%bash
python mp.py
http://ipython.org/ipython-doc/dev/parallel/
The strategy for achieving parallelization in IPython Parallel differs from multiprocessing
in that you need to start engine processes and a hub process before you want to run code.
One then connect to the hub, which will aid in the distrubution of work across the engines.
When IPython is installed, a program ipcluster
is also installed which simplifies starting engines and hub. To start a cluster with 4 workers one would run
$ ipcluster start -n 4
In the IPython Notebook, clusters can also be started from the IPython Dashboard.
In [24]:
from IPython.parallel import Client
Let's try initiating the client before we have started the cluster!
In [25]:
cli = Client()
IPython keeps track of addresses and such of local engines in the .ipython
directory in your home folder. But Client()
can also take a URL or a path to a configuration file with information on how to connect to the cluster.
Now start the cluster (either in terminal or on the Dashboard)
In [26]:
cli = Client()
In [27]:
cli.ids
Out[27]:
The ids
field lists the engines in the cluster.
Say we want to have a look at the pid like we have done for all the multiprocessing
exampels.
In [28]:
def get_pid():
import os
return os.getpid()
In [29]:
cli[0].apply_sync(get_pid)
Out[29]:
In [30]:
get_pid()
Out[30]:
We need to import os
in the function we send to the engine. That engine was started without any imported functions. Every time we want the engines to have some data, we need to explicitly send it to them. Or instructions on how to get it in the case of import
.
There is a method for syncing import across engines.
In [33]:
cli[:].apply_sync(get_pid)
Out[33]:
A collection of engines is referred to as an engine pool, the standard view to an engine pool is called a direct view.
In [34]:
dview = cli[:]
The direct view provides a decorator which can be used to make any function parallelized.
In [35]:
@dview.parallel(block=True)
def pstr(x):
return str(x)
After the .parallel
decorater have been used on a function, it is changed in such a way that arguments given to the function will be split up, sent to engines, then put together again after the calculation is done.
The new ParallelFunction
also has a .map()
method which will run the function for each of the values in an iterable on different engines, and return the list of results when it's done.
In [36]:
str(range(10))
Out[36]:
In [37]:
pstr(range(10))
Out[37]:
In [38]:
pstr.map(range(10))
Out[38]:
In [39]:
from numpy import random
In [40]:
@dview.parallel(block=True)
def task(delay):
import os, time
t0 = time.time()
pid = os.getpid()
time.sleep(delay)
t1 = time.time()
return [pid, t0, t1]
In [43]:
delays = random.rand(4)
In [44]:
task.map(delays)
Out[44]:
In [45]:
def visualize_tasks(results):
res = np.array(results)
fig, ax = plt.subplots(figsize=(10, res.shape[1]))
yticks = []
yticklabels = []
tmin = min(res[:,1])
for n, pid in enumerate(np.unique(res[:,0])):
yticks.append(n)
yticklabels.append("%d" % pid)
for m in np.where(res[:,0] == pid)[0]:
ax.add_patch(Rectangle((res[m,1] - tmin, n-0.25),
res[m,2] - res[m,1], 0.5, color="green", alpha=0.5))
ax.set_ylim(-.5, n+.5)
ax.set_xlim(0, max(res[:,2]) - tmin + 0.)
ax.set_yticks(yticks)
ax.set_yticklabels(yticklabels)
ax.set_ylabel("PID")
ax.set_xlabel("seconds")
In [46]:
delays = random.rand(100) / 4.
In [47]:
result = task.map(delays)
In [48]:
visualize_tasks(result)
There are more views to engine pools, one of them is the load_balanced_view
which automatically helps utilize engines which normally would sit aroudn idly waiting for other engines to finish.
In [49]:
lbview = cli.load_balanced_view()
In [50]:
@lbview.parallel(block=True)
def lb_task(delay):
import os, time
t0 = time.time()
pid = os.getpid()
time.sleep(delay)
t1 = time.time()
return [pid, t0, t1]
In [51]:
result = lb_task.map(delays)
In [52]:
visualize_tasks(result)
(Load balancing visualization example from http://nbviewer.ipython.org/urls/raw.github.com/jrjohansson/scientific-python-lectures/master/Lecture-6B-HPC.ipynb )
To make sure that engines have the same modules loaded, one can use the method .sync_imports()
of an engine pool view.
In [53]:
with dview.sync_imports():
import numpy
import os
In [54]:
%%px
os.getpid()
Since the engines will not have the same namespace as any interpreter that might want to use them, if we want them to have the same data to work on one need to send that data to the engines.
This is done with the .push()
method of an engine pool view.
In [55]:
dview.push(dict(a=np.arange(5), b=np.zeros(5)), block=True)
Out[55]:
In [56]:
dview.pull("a", block=True)
Out[56]:
For the sake of convenience, pushing and pulling to the namespaces on the engines can be done through a dictionary like syntax
In [57]:
dview["a"]
Out[57]:
In [58]:
dview["c"] = "See"
In [59]:
dview.pull("c", block=True)
Out[59]:
Something which might be more useful is to push different sections of data to various engines, just like when we applied the parallel function above.
The direct view has the methods .scatter()
and .gather()
.
In [60]:
dview.scatter('x', np.arange(95), block=True)
This will divide the array to four sections, and divide those out to different engines.
In [61]:
%%px
x
In [62]:
def x_mean():
return x.mean()
In [63]:
dview.apply_sync(x_mean)
Out[63]:
In [64]:
dview.gather("x", targets=1, block=True)
Out[64]:
An alternative to importing modules in the definition of a function, there is a @require
decoratir in IPython.parallel
which will cause the required module to be imported before the code in the function is executed.
In [65]:
from IPython.parallel import require
In [66]:
@require("os")
def get_pid():
return os.getpid()
In [67]:
dview.apply_sync(get_pid)
Out[67]:
In [ ]:
def factorize(n):
if n < 2:
return []
factors = []
p = 2
while True:
if n == 1:
return factors
r = n % p
if r == 0:
factors.append(p)
n = n / p
elif p * p >= n:
factors.append(n)
return factors
elif p > 2:
p += 2
else:
p += 1
The task is to factorize all numbers from 2 to 500000, and count the number of unique factors. Then count how many times a factor count occurs. That is, make a histogram of factor counts.
For the sake of clarity, this is what the task would be for the numbers up to 10:
number factors unique num_unique_factors
2 [2] 1
3 [3] 1
4 [2, 2] [2] 1
5 [5] 1
6 [2, 3] 2
7 [7] 1
8 [2, 2, 2] [2] 1
9 [3, 3] [3] 1
10 [2, 5] 2
Which gives us
{1: 7, 2: 2}
First implement this serially, that is, how you normally would. (It should take in the order of 15 seconds for 500000 numbers).
Then make an implementation which uses multiprocessing
for parallelization, (the parallelization strategy should be to factor some numbers on some processes, don't worry about the implementation of the factoring function).
Implement the same thing using IPython.parallel
. (Remember that you need to start the ipcluster
before running a script using IPython.parallel
, this is something I have a tendency to forget.)
For this assignment, put everything in one script. Call the script num_factors.py
. Put it in the scripts
directory of your repository.
The script should take a character as an argument to determining how it will be run. So to run it serially, you run
$ num_factors.py s
With multiprocessing
$ num_factors.py m
and with IPython.parallel
$ num_factors.py i
The script should print a dictionary like in the example above
In [ ]: